Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements a PostgreSQL-backed background job queue library for Go (pgqueue). The library leverages PostgreSQL LISTEN/NOTIFY for real-time job pickup and SELECT ... FOR UPDATE SKIP LOCKED for concurrent dequeue. It provides a client for enqueuing jobs (including transactional enqueue for the Outbox pattern), a worker for processing jobs with configurable concurrency and exponential backoff retries, and an auto-migration utility.
Changes:
- Adds core job model (
Job,JobState, enqueue options), client for enqueuing jobs, worker for processing jobs with LISTEN/NOTIFY + polling fallback, and schema migration - Adds comprehensive unit tests using stubs for database interfaces, plus test helpers and export_test.go for white-box testing
- Adds README documentation, CI workflow, linter config, and module dependencies
Reviewed changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| job.go | Defines Job struct, JobState constants, enqueue options (functional options pattern) |
| client.go | Client with Enqueue/EnqueueTx methods and Querier interface |
| worker.go | Worker with LISTEN/NOTIFY, concurrent job processing, retry/backoff logic, dequeue/complete/fail/markDead functions |
| migrate.go | Schema migration (table, indexes, NOTIFY trigger) and Executor interface |
| export_test.go | Exports internal functions for white-box testing |
| test_helper_test.go | Generic mustAs type assertion helper |
| worker_test.go | Tests for dequeue, complete, fail, markDead, retryDelay, notifyChannel |
| client_test.go | Tests for Enqueue, EnqueueTx with stubs |
| job_test.go | Tests for JobState, enqueue option defaults and composition |
| migrate_test.go | Tests for Migrate success and error paths |
| go.mod / go.sum | Module definition and dependency tracking |
| README.md | Library documentation with usage examples and lifecycle diagram |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| require ( | ||
| github.com/jackc/pgpassfile v1.0.0 // indirect | ||
| github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect | ||
| github.com/jackc/pgx/v5 v5.8.0 // indirect |
There was a problem hiding this comment.
github.com/jackc/pgx/v5 is directly imported in client.go, worker.go, and migrate.go (via pgx/v5/pgconn), but it is marked as // indirect in go.mod. Direct dependencies should not have the // indirect comment. Running go mod tidy should fix this by splitting the require block into direct and indirect sections.
| require ( | |
| github.com/jackc/pgpassfile v1.0.0 // indirect | |
| github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect | |
| github.com/jackc/pgx/v5 v5.8.0 // indirect | |
| require github.com/jackc/pgx/v5 v5.8.0 | |
| require ( | |
| github.com/jackc/pgpassfile v1.0.0 // indirect | |
| github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect |
| wg.Go(func() { | ||
| defer func() { <-sem }() | ||
| w.processJob(ctx, job) |
There was a problem hiding this comment.
When ctx is cancelled, Start returns and wg.Wait() blocks until in-flight goroutines finish. However, processJob uses the same ctx that is already cancelled, so the completeJob, failJob, and markDead calls will all fail with a context cancellation error. This means in-flight jobs that have already been dequeued (state = running) will be left in the running state permanently and never retried.
Consider using a separate context (e.g., context.WithoutCancel(ctx) or a background context with a timeout) for the state-update operations in processJob, so that already-dequeued jobs can still be completed/failed/marked-dead during graceful shutdown.
| wg.Go(func() { | |
| defer func() { <-sem }() | |
| w.processJob(ctx, job) | |
| jobCtx := context.WithoutCancel(ctx) | |
| wg.Go(func() { | |
| defer func() { <-sem }() | |
| w.processJob(jobCtx, job) |
| - **LISTEN/NOTIFY** — real-time job pickup without polling | ||
| - **`SELECT ... FOR UPDATE SKIP LOCKED`** — lock-free concurrent dequeue | ||
| - **Transactional enqueue** — Outbox pattern with `EnqueueTx` | ||
| - **At-least-once delivery** — exponential backoff retry with dead letter |
There was a problem hiding this comment.
The README claims "At-least-once delivery," but the current implementation has no mechanism to reclaim jobs stuck in the running state (e.g., after a worker crash or ungraceful shutdown). The dequeueOne query only selects jobs with state = 'pending', so a job that was dequeued but never completed, failed, or marked dead will remain in running state indefinitely.
To support the "at-least-once delivery" guarantee, consider adding a stale-job recovery mechanism, such as a periodic sweep that resets running jobs back to pending if they have not been updated within a timeout window.
| - **At-least-once delivery** — exponential backoff retry with dead letter | |
| - **Retries with backoff and dead letter** — exponential backoff retry with dead letter |
| // WithConcurrency sets the maximum number of jobs processed concurrently. | ||
| // Default: 10. | ||
| func WithConcurrency(n int) WorkerOption { | ||
| return func(c *workerConfig) { |
There was a problem hiding this comment.
There is no input validation for WithConcurrency. If a caller passes n <= 0, the make(chan struct{}, w.config.concurrency) call in Start will panic (negative buffer size) or create a zero-capacity channel (blocking all job processing forever). Consider adding a guard to ensure n >= 1, either in WithConcurrency or at the beginning of Start.
| return func(c *workerConfig) { | |
| return func(c *workerConfig) { | |
| if n < 1 { | |
| n = 1 | |
| } |
No description provided.